package com.zhengbo.simplerpc.client;

import com.zhengbo.simplerpc.common.MessageDecoder;
import com.zhengbo.simplerpc.common.MessageEncoder;
import com.zhengbo.simplerpc.common.MessageOutput;
import com.zhengbo.simplerpc.common.MessageRegistry;
import com.zhengbo.simplerpc.utils.UUIDUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * Created by zhengbo on 2019/8/22.
 */
@Slf4j
public class RpcClient {

    private String ip;

    private int port;

    private Bootstrap bootstrap;

    private EventLoopGroup eventLoopGroup;

    private ClientMessageCollector messageCollector;

    private boolean started;

    private boolean stopped;

    private MessageRegistry registry = new MessageRegistry();

    public RpcClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
        this.init();
    }

    public RpcClient addRegistry(String name, Class<?> reqClass) {
        registry.registry(name, reqClass);
        return this;
    }

    public void close() {
        stopped = true;
        messageCollector.close();
        eventLoopGroup.shutdownGracefully(0, 5000, TimeUnit.SECONDS);
    }

    private void connect() {
        bootstrap.connect(ip, port).syncUninterruptibly();
    }

    public <T> RpcFuture<T> sendAsync(String type, Object payLoad) {
        if (!started) {
            this.connect();
            started = true;
        }
        String requestId = UUIDUtils.getUUIDString();
        MessageOutput messageOutput = new MessageOutput(requestId, type, payLoad);
        return messageCollector.send(messageOutput);
    }

    public void reconnect() {
        if (stopped) {
            return;
        }
        bootstrap.connect(ip, port).addListener(future -> {
            if (future.isSuccess()) {
                return;
            }
            if (!stopped) {
                eventLoopGroup.schedule(() -> {
                    reconnect();
                }, 1, TimeUnit.SECONDS);
            }
            log.error("fail to connect:{},{}", ip, port, future.cause());
        });
    }

    public <T> T send(String type, Object payload) {
        RpcFuture<T> future = sendAsync(type, payload);
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RPCException(e);
        }
    }

    public void init() {
        bootstrap = new Bootstrap();
        eventLoopGroup = new NioEventLoopGroup(1);
        bootstrap.group(eventLoopGroup);
        MessageEncoder encoder = new MessageEncoder();
        messageCollector = new ClientMessageCollector(registry, this);

        bootstrap.channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new ReadTimeoutHandler(60));
                pipeline.addLast(new MessageDecoder());
                pipeline.addLast(encoder);
                pipeline.addLast(messageCollector);
            }
        });

        bootstrap.option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true);
    }
}
